package org.example.dobs.demo.flink.wc.transform;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import source.custom.MySourceFunction_V1;

/**
 * union的两个流的数据类型，必须一致
 * connection的两个流的数据类型，可以不一致
 */
public class ConnectionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(1);

        DataStreamSource<Long> inputDataStream1 = see.addSource(new MySourceFunction_V1());
        SingleOutputStreamOperator<String> inputDataStream2 = see.addSource(new MySourceFunction_V1())
                .map(new MapFunction<Long, String>() {
                    @Override
                    public String map(Long aLong) throws Exception {
                        return "str" + aLong;
                    }
                });
        //inputDataStream1和inputDataStream2类型必须一致
        ConnectedStreams<Long, String> connectStream = inputDataStream1.connect(inputDataStream2);
        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
            @Override
            public String map1(Long aLong) throws Exception {
                System.out.println("map1 " + aLong);
                return ""+aLong;
            }

            @Override
            public String map2(String s) throws Exception {
                System.out.println("map2 " + s);
                return s;
            }
        });
        result.print();

        see.execute(ConnectionDemo.class.getSimpleName());
    }

}
